package com.rabbitmq.origin.api;

import com.rabbitmq.client.BuiltinExchangeType;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.rabbitmq.common.constants.CommonConstants.*;


/**
 * @author
 * @Describe 功能描述
 * @date
 */
public class MqConsumerDemoTest {

    static final ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void main(String[] args) {

        /*******************1.消费者直连队列进行消息消费，多个消费者消费同一个队列消息*******************/
        // 1.1 自动确认,轮询平均发送
        /*RabbitMqConsumerQueue rabbitMqConsumerQueue = new RabbitMqConsumerQueue();
        String methodName = "publishQueue";
        for (int i = 1; i <= 3; i++) {
            final int flag = i;
            new Thread(() -> {
                rabbitMqConsumerQueue.consumerQueue(QUEUE_NAME_PREFIX + methodName, flag);
            }, "ConsumerQueue-" + i).start();
        }*/

        // 1.2 手动确认机制
        /*RabbitMqConsumerQueue rabbitMqConsumerQueue = new RabbitMqConsumerQueue();
        String methodName = "publishQueue";
        for (int i = 1; i <= 3; i++) {
            final int flag = i;
            new Thread(() -> {
                rabbitMqConsumerQueue.consumerQueueManualConfirmed(QUEUE_NAME_PREFIX + methodName, flag);
            }, "ConsumerQueueManualConfirmed-" + i).start();
        }*/

        // 1.3 获取单个消息消费
        /*RabbitMqConsumerQueue rabbitMqConsumerQueue = new RabbitMqConsumerQueue();
        String methodName = "publishQueue";
        for (int i = 1; i <= 3; i++) {
            new Thread(() -> {
                rabbitMqConsumerQueue.consumerSingle(QUEUE_NAME_PREFIX + methodName);
            }, "consumerSingle-" + i).start();
        }*/

        /*******************2.通过交换机进行消息消费*******************/
        // 2.1 fanout类型的交换机消息消费
        /*RabbitMqConsumerFanout rabbitMqConsumerFanout = new RabbitMqConsumerFanout();
        String methodName = "publishFanout";
        String exchangeType = BuiltinExchangeType.FANOUT.getType();
        for (int i = 1; i <= 3; i++) {
            final int flag = i;
            executorService.execute(() -> {
                // 消费者
                rabbitMqConsumerFanout.consumerFanoutManualConfirmed(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName, flag);
            });
        }*/

        // 2.2 direct类型的交换机消息消费
        /*RabbitMqConsumerRouting rabbitMqConsumerRouting = new RabbitMqConsumerRouting();
        String methodName = "publishDirect";
        String exchangeType = BuiltinExchangeType.DIRECT.getType();

        for (int i = 1; i <= 3; i++) {
            final int flag = i;
            String routingKey = ROUTING_KEYS[(i-1) % 3];
            executorService.execute(() -> {
                // 消费者指定路由的消息(此处三个消费者各自消费匹配路由的消息，消费队列匹配一个路由key)
                rabbitMqConsumerRouting.consumerRouting(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                        flag, routingKey);
            });
        }*/

        // 一个消费者队列绑定多个路由key
        /*RabbitMqConsumerRouting rabbitMqConsumerRouting = new RabbitMqConsumerRouting();
        String methodName = "publishDirect";
        String exchangeType = BuiltinExchangeType.DIRECT.getType();
        // 消费者指定路由的消息(此处一个消费者队列绑定多个匹配路由)
        rabbitMqConsumerRouting.consumerRouting(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                1, ROUTING_KEYS[0], ROUTING_KEYS[1], ROUTING_KEYS[2]);*/

        // 2.3 topic类型的交换机消息消费
        /*RabbitMqConsumerTopic rabbitMqConsumerTopic = new RabbitMqConsumerTopic();
        String methodName = "publishTopic";
        String exchangeType = BuiltinExchangeType.TOPIC.getType();
        // 消费者指定路由的消息(此处一个消费者队列绑定多个匹配路由)
        rabbitMqConsumerTopic.consumerTopic(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                1, "#.critival", "*.topic.warn");*/

        // 2.4 headers类型的交换机消息消费
        // 构建headers,消费者端的参数内容如果属于生产端
        /*Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("param1", "param1");
        headers.put("param4", "param4");
        String methodName = "publishHeaders";
        String exchangeType = BuiltinExchangeType.HEADERS.getType();
        RabbitMqConsumerHeaders rabbitMqConsumerHeaders = new RabbitMqConsumerHeaders();
        rabbitMqConsumerHeaders.consumerHeaders(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                1, headers);*/

        /*******************3.使用指定队列存放消息并根据路由进行消息消费*******************/
        // 3.1 fanout类型交换机
        /*RabbitMqConsumerExchange2Queue rabbitMqConsumerExchange2Queue = new RabbitMqConsumerExchange2Queue();
        String methodName = "publishExchange2Queue";
        String exchangeType = BuiltinExchangeType.FANOUT.getType();
        for (int i = 1; i <= 3; i++) {
            final int flag = i;
            executorService.execute(() -> {
                // 消费者,广播类型时每个消费端需要有自己的独有队列接收消息,如果共用一个接收消息的队列则会进行消息共享
                // 多个消费者共享一个队列
                *//*rabbitMqConsumerExchange2Queue.consumerExchange2Queue(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                        exchangeType, "", QUEUE_NAME_PREFIX + exchangeType + "_" + methodName, flag);*//*
                // 每个消费者拥有自己的队列
                rabbitMqConsumerExchange2Queue.consumerExchange2Queue(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                        exchangeType, "", QUEUE_NAME_PREFIX + exchangeType + "_" + methodName + flag, flag);
            });
        }*/

        // 3.2 direct类型交换机
        /*RabbitMqConsumerExchange2Queue rabbitMqConsumerExchange2Queue = new RabbitMqConsumerExchange2Queue();
        String methodName = "publishExchange2Queue";
        String exchangeType = BuiltinExchangeType.DIRECT.getType();

        // 单个消费者
        rabbitMqConsumerExchange2Queue.consumerExchange2Queue(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                exchangeType, ROUTING_KEYS[0], QUEUE_NAME_PREFIX + exchangeType + "_" + methodName, 1);

        rabbitMqConsumerExchange2Queue.consumerExchange2Queue(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                exchangeType, ROUTING_KEYS[1], QUEUE_NAME_PREFIX + exchangeType + "_" + methodName, 2);

        rabbitMqConsumerExchange2Queue.consumerExchange2Queue(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                exchangeType, ROUTING_KEYS[2], QUEUE_NAME_PREFIX + exchangeType + "_" + methodName, 3);
        // 多个消费者根据路由消费同一个队列
        for (int i = 1; i <= 3; i++) {
            final int flag = i;
            // 不同的路由消费同一个队列消息
            String routingKey = ROUTING_KEYS[(i - 1) % 3];
            // 同一个路由消费同一个队列消息
            // String routingKey = ROUTING_KEYS[2];
            executorService.execute(() -> {
                rabbitMqConsumerExchange2Queue.consumerExchange2Queue(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                        exchangeType, routingKey, QUEUE_NAME_PREFIX + exchangeType + "_" + methodName, flag);
            });
        }*/

        // 3.3 topic类型交换机
        /*RabbitMqConsumerExchange2Queue rabbitMqConsumerExchange2Queue = new RabbitMqConsumerExchange2Queue();
        String methodName = "publishExchange2Queue";
        String exchangeType = BuiltinExchangeType.TOPIC.getType();
        rabbitMqConsumerExchange2Queue.consumerExchange2Queue(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                exchangeType, "#.warn", QUEUE_NAME_PREFIX + exchangeType + "_" + methodName, 1);

        rabbitMqConsumerExchange2Queue.consumerExchange2Queue(EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                exchangeType, "kinson.*.info", QUEUE_NAME_PREFIX + exchangeType + "_" + methodName, 2);*/

        // 3.4 topic消费队列对应的死信队列消息来实现延迟队列效果
        // 3.4.1 消息队列设置ttl失效到死信队列
        /*RabbitMqConsumerTopicDeadLetterx rabbitMqConsumerTopicDeadLetterx = new RabbitMqConsumerTopicDeadLetterx();
        String methodName = "publishTopicQueueTtlDeadLetterx";
        String routingKey = TOPIC_ROUTING_KEYS[0];
        String exchangeType = BuiltinExchangeType.TOPIC.getType();
        // 消费者指定路由的消息(此处一个消费者队列绑定多个匹配路由)
        // rabbitMqConsumerTopicDeadLetterx.consumerTopicSingleMsgTtlDeadLetterx(DLX_QUEUE_NAME_PREFIX + EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
        //        DLX_QUEUE_NAME_PREFIX + routingKey, DLX_QUEUE_NAME_PREFIX + QUEUE_NAME_PREFIX + exchangeType + "_" + methodName + "1");

        rabbitMqConsumerTopicDeadLetterx.consumerTopicSingleMsgTtlDeadLetterx(DLX_QUEUE_NAME_PREFIX + EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                DLX_QUEUE_NAME_PREFIX + "#", DLX_QUEUE_NAME_PREFIX + QUEUE_NAME_PREFIX + exchangeType + "_" + methodName + "3");*/

        // 3.4.2 单个消息ttl失效到死信队列
        RabbitMqConsumerTopicDeadLetterx rabbitMqConsumerTopicDeadLetterx = new RabbitMqConsumerTopicDeadLetterx();
        String methodName = "publishTopicSingleMsgTtlDeadLetterx";
        String routingKey = TOPIC_ROUTING_KEYS[0];
        String exchangeType = BuiltinExchangeType.TOPIC.getType();
        // 消费者指定路由的消息(此处一个消费者队列绑定多个匹配路由)
        // rabbitMqConsumerTopicDeadLetterx.consumerTopicSingleMsgTtlDeadLetterx(DLX_QUEUE_NAME_PREFIX + EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
        //       DLX_QUEUE_NAME_PREFIX + "#", DLX_QUEUE_NAME_PREFIX + QUEUE_NAME_PREFIX + exchangeType + "_" + methodName + "2");

         rabbitMqConsumerTopicDeadLetterx.consumerTopicSingleMsgTtlDeadLetterx(DLX_QUEUE_NAME_PREFIX + EXCHANGE_NAME_PREFIX + exchangeType + "_" + methodName,
                DLX_QUEUE_NAME_PREFIX + routingKey, DLX_QUEUE_NAME_PREFIX + QUEUE_NAME_PREFIX + exchangeType + "_" + methodName + "1");

    }
}

